// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
	"context"
	"fmt"
	"math"
	"strconv"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/pingcap/failpoint"
	"github.com/pingcap/tidb/pkg/ddl"
	"github.com/pingcap/tidb/pkg/meta"
	"github.com/pingcap/tidb/pkg/meta/model"
	"github.com/pingcap/tidb/pkg/parser/mysql"
	"github.com/pingcap/tidb/pkg/sessionctx/vardef"
	"github.com/pingcap/tidb/pkg/sessiontxn"
	"github.com/pingcap/tidb/pkg/testkit"
	"github.com/pingcap/tidb/pkg/testkit/external"
	"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
	"github.com/pingcap/tidb/pkg/util"
	"github.com/pingcap/tidb/pkg/util/mock"
	"github.com/stretchr/testify/require"
)

func batchInsert(tk *testkit.TestKit, tbl string, start, end int) {
	dml := fmt.Sprintf("insert into %s values", tbl)
	for i := start; i < end; i++ {
		dml += fmt.Sprintf("(%d, %d, %d)", i, i, i)
		if i != end-1 {
			dml += ","
		}
	}
	tk.MustExec(dml)
}

func TestModifyColumnReorgInfo(t *testing.T) {
	store := testkit.CreateMockStore(t)

	limit := vardef.GetDDLErrorCountLimit()
	vardef.SetDDLErrorCountLimit(5)
	defer func() {
		vardef.SetDDLErrorCountLimit(limit)
	}()
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("drop table if exists t1")
	tk.MustExec("create table t1 (c1 int, c2 int, c3 int, index idx(c2), index idx1(c1, c2));")

	sql := "alter table t1 change c2 c2 varchar(16);"
	// defaultBatchSize is equal to ddl.defaultBatchSize
	base := defaultBatchSize * 8
	// add some rows
	batchInsert(tk, "t1", 0, base)
	// Make sure the count of regions more than backfill workers.
	tk.MustQuery("split table t1 between (0) and (8192) regions 8;").Check(testkit.Rows("8 1"))

	tbl := external.GetTableByName(t, tk, "test", "t1")

	// Check insert null before job first update.
	var checkErr error
	var currJob *model.Job
	var elements []*meta.Element
	ctx := mock.NewContext()
	ctx.Store = store
	times := 0
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
		if tbl.Meta().ID != job.TableID || checkErr != nil || job.SchemaState != model.StateWriteReorganization {
			return
		}
		if job.Type == model.ActionModifyColumn {
			if times == 0 {
				times++
			}
		}
		if job.Type == model.ActionAddIndex {
			if times == 1 {
				times++
				return
			}
			tbl := external.GetTableByName(t, tk, "test", "t1")
			indexInfo := tbl.Meta().FindIndexByName("idx2")
			elements = []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
		}
	})

	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/modifyColumnTypeWithData", func(job *model.Job, args model.JobArgs) {
		if tbl.Meta().ID == job.TableID &&
			checkErr == nil &&
			job.SchemaState == model.StateDeleteOnly &&
			job.Type == model.ActionModifyColumn {
			currJob = job
			a := args.(*model.ModifyColumnArgs)
			elements = ddl.BuildElements(a.ChangingColumn, a.ChangingIdxs)
		}
	})

	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/MockGetIndexRecordErr", `return("cantDecodeRecordErr")`))
	err := tk.ExecToErr(sql)
	require.EqualError(t, err, "[ddl:8202]Cannot decode index value, because mock can't decode record error")
	require.NoError(t, checkErr)
	// Check whether the reorg information is cleaned up when executing "modify column" failed.
	checkReorgHandle := func(gotElements, expectedElements []*meta.Element) {
		require.Equal(t, len(expectedElements), len(gotElements))
		for i, e := range gotElements {
			require.Equal(t, expectedElements[i], e)
		}
		// check the consistency of the tables.
		currJobID := strconv.FormatInt(currJob.ID, 10)
		tk.MustQuery("select job_id, reorg, schema_ids, table_ids, type, processing from mysql.tidb_ddl_job where job_id = " + currJobID).Check(testkit.Rows())
		tk.MustQuery("select job_id from mysql.tidb_ddl_history where job_id = " + currJobID).Check(testkit.Rows(currJobID))
		tk.MustQuery("select job_id, ele_id, ele_type, physical_id from mysql.tidb_ddl_reorg where job_id = " + currJobID).Check(testkit.Rows())
		require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx))
		e, start, end, physicalID, err := ddl.NewReorgHandlerForTest(testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob)
		require.Error(t, err, "Error not ErrDDLReorgElementNotExists, found orphan row in tidb_ddl_reorg for job.ID %d: e: '%s', physicalID: %d, start: 0x%x end: 0x%x", currJob.ID, e, physicalID, start, end)
		require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err))
		require.Nil(t, e)
		require.Nil(t, start)
		require.Nil(t, end)
		require.Zero(t, physicalID)
	}
	expectedElements := []*meta.Element{
		{ID: 4, TypeKey: meta.ColumnElementKey},
		{ID: 3, TypeKey: meta.IndexElementKey},
		{ID: 4, TypeKey: meta.IndexElementKey}}
	checkReorgHandle(elements, expectedElements)
	require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/MockGetIndexRecordErr"))
	tk.MustExec("admin check table t1")

	// Check whether the reorg information is cleaned up when executing "modify column" successfully.
	// Test encountering a "notOwnerErr" error which caused the processing backfill job to exit halfway.
	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/MockGetIndexRecordErr", `return("modifyColumnNotOwnerErr")`))
	tk.MustExec(sql)
	expectedElements = []*meta.Element{
		{ID: 5, TypeKey: meta.ColumnElementKey},
		{ID: 5, TypeKey: meta.IndexElementKey},
		{ID: 6, TypeKey: meta.IndexElementKey}}
	checkReorgHandle(elements, expectedElements)
	tk.MustExec("admin check table t1")
	require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/MockGetIndexRecordErr"))

	// Test encountering a "notOwnerErr" error which caused the processing backfill job to exit halfway.
	// During the period, the old TiDB version(do not exist the element information) is upgraded to the new TiDB version.
	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/MockGetIndexRecordErr", `return("addIdxNotOwnerErr")`))
	tk.MustExec("alter table t1 add index idx2(c1)")
	expectedElements = []*meta.Element{
		{ID: 7, TypeKey: meta.IndexElementKey}}
	checkReorgHandle(elements, expectedElements)
	tk.MustExec("admin check table t1")
	require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/MockGetIndexRecordErr"))
}

func TestModifyColumnNullToNotNullWithChangingVal2(t *testing.T) {
	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")

	// insert null value before modifying column
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDoModifyColumnSkipReorgCheck", func() {
		tk2 := testkit.NewTestKit(t, store)
		tk2.MustExec("insert into test.tt values (NULL, NULL)")
	})

	tk.MustExec("drop table if exists tt;")
	tk.MustExec(`create table tt (a bigint, b int);`)
	tk.MustExec("insert into tt values (1,1),(2,2),(3,3);")
	err := tk.ExecToErr("alter table tt modify a int not null;")
	require.EqualError(t, err, "[ddl:1138]Invalid use of NULL value")
	tk.MustExec("drop table tt")
}

func TestModifyColumnNullToNotNull(t *testing.T) {
	store := testkit.CreateMockStoreWithSchemaLease(t, 600*time.Millisecond)
	tk1 := testkit.NewTestKit(t, store)
	tk2 := testkit.NewTestKit(t, store)

	tk1.MustExec("use test")
	tk2.MustExec("use test")

	tk1.MustExec("create table t1 (c1 int, c2 int)")

	tbl := external.GetTableByName(t, tk1, "test", "t1")

	// Check insert null before job first update.
	tk1.MustExec("delete from t1")
	once := sync.Once{}
	var checkErr error
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
		if tbl.Meta().ID != job.TableID {
			return
		}
		once.Do(func() {
			checkErr = tk2.ExecToErr("insert into t1 values ()")
		})
	})
	err := tk1.ExecToErr("alter table t1 change c2 c2 int not null")
	require.NoError(t, checkErr)
	require.EqualError(t, err, "[ddl:1138]Invalid use of NULL value")
	tk1.MustQuery("select * from t1").Check(testkit.Rows("<nil> <nil>"))

	// Check insert error when column has PreventNullInsertFlag.
	tk1.MustExec("delete from t1")
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
		if tbl.Meta().ID != job.TableID {
			return
		}

		if job.State != model.JobStateRunning {
			return
		}
		// now c2 has PreventNullInsertFlag, an error is expected.
		checkErr = tk2.ExecToErr("insert into t1 values ()")
	})
	tk1.MustExec("alter table t1 change c2 c2 int not null")
	require.EqualError(t, checkErr, "[table:1048]Column 'c2' cannot be null")

	c2 := external.GetModifyColumn(t, tk1, "test", "t1", "c2", false)
	require.True(t, mysql.HasNotNullFlag(c2.GetFlag()))
	require.False(t, mysql.HasPreventNullInsertFlag(c2.GetFlag()))
	err = tk1.ExecToErr("insert into t1 values ();")
	require.EqualError(t, err, "[table:1364]Field 'c2' doesn't have a default value")
}

func TestModifyColumnNullToNotNullWithChangingVal(t *testing.T) {
	store := testkit.CreateMockStoreWithSchemaLease(t, 600*time.Millisecond)
	tk1 := testkit.NewTestKit(t, store)
	tk2 := testkit.NewTestKit(t, store)

	tk1.MustExec("use test")
	tk2.MustExec("use test")

	tk1.MustExec("create table t1 (c1 int, c2 int)")

	tbl := external.GetTableByName(t, tk1, "test", "t1")

	// Check insert null before job first update.
	tk1.MustExec("delete from t1")
	once := sync.Once{}
	var checkErr error
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
		if tbl.Meta().ID != job.TableID {
			return
		}
		once.Do(func() {
			// Insert null value to make modify column fail.
			require.NoError(t, tk2.ExecToErr("insert into t1 values ()"))
		})
	})
	err := tk1.ExecToErr("alter table t1 change c2 c2 tinyint not null")
	require.NoError(t, checkErr)
	require.EqualError(t, err, "[ddl:1138]Invalid use of NULL value")
	tk1.MustQuery("select * from t1").Check(testkit.Rows("<nil> <nil>"))

	// Check insert error when column has PreventNullInsertFlag.
	tk1.MustExec("delete from t1")
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterModifyColumnStateDeleteOnly", func(_ int64) {
		err = tk2.ExecToErr("insert into t1 values ()")
		require.EqualError(t, checkErr, "[table:1048]Column 'c2' cannot be null")
	})
	tk1.MustExec("alter table t1 change c2 c2 tinyint not null")

	c2 := external.GetModifyColumn(t, tk1, "test", "t1", "c2", false)
	require.True(t, mysql.HasNotNullFlag(c2.GetFlag()))
	require.False(t, mysql.HasPreventNullInsertFlag(c2.GetFlag()))
	require.EqualError(t, tk1.ExecToErr("insert into t1 values ()"), "[table:1364]Field 'c2' doesn't have a default value")

	c2 = external.GetModifyColumn(t, tk1, "test", "t1", "c2", false)
	require.Equal(t, mysql.TypeTiny, c2.FieldType.GetType())
}

func TestModifyColumnBetweenStringTypes(t *testing.T) {
	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")

	// varchar to varchar
	tk.MustExec("create table tt (a varchar(10));")
	tk.MustExec("insert into tt values ('111'),('10000');")
	tk.MustExec("alter table tt change a a varchar(5);")
	mvc := external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, 5, mvc.FieldType.GetFlen())
	tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))
	tk.MustGetErrMsg("alter table tt change a a varchar(4);", "[types:1265]Data truncated for column 'a', value is '10000'")
	tk.MustExec("alter table tt change a a varchar(100);")
	tk.MustQuery("select length(a) from tt").Check(testkit.Rows("3", "5"))

	// char to char
	tk.MustExec("drop table if exists tt;")
	tk.MustExec("create table tt (a char(10));")
	tk.MustExec("insert into tt values ('111'),('10000');")
	tk.MustExec("alter table tt change a a char(5);")
	mc := external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, 5, mc.FieldType.GetFlen())
	tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))
	tk.MustGetErrMsg("alter table tt change a a char(4);", "[types:1265]Data truncated for column 'a', value is '10000'")
	tk.MustExec("alter table tt change a a char(100);")
	tk.MustQuery("select length(a) from tt").Check(testkit.Rows("3", "5"))

	// binary to binary
	tk.MustExec("drop table if exists tt;")
	tk.MustExec("create table tt (a binary(10));")
	tk.MustExec("insert into tt values ('111'),('10000');")
	tk.MustGetErrMsg("alter table tt change a a binary(5);", "[types:1265]Data truncated for column 'a', value is '111\x00\x00\x00\x00\x00\x00\x00'")
	mb := external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, 10, mb.FieldType.GetFlen())
	tk.MustQuery("select * from tt").Check(testkit.Rows("111\x00\x00\x00\x00\x00\x00\x00", "10000\x00\x00\x00\x00\x00"))
	tk.MustGetErrMsg("alter table tt change a a binary(4);", "[types:1265]Data truncated for column 'a', value is '111\x00\x00\x00\x00\x00\x00\x00'")
	tk.MustExec("alter table tt change a a binary(12);")
	tk.MustQuery("select * from tt").Check(testkit.Rows("111\x00\x00\x00\x00\x00\x00\x00\x00\x00", "10000\x00\x00\x00\x00\x00\x00\x00"))
	tk.MustQuery("select length(a) from tt").Check(testkit.Rows("12", "12"))

	// varbinary to varbinary
	tk.MustExec("drop table if exists tt;")
	tk.MustExec("create table tt (a varbinary(10));")
	tk.MustExec("insert into tt values ('111'),('10000');")
	tk.MustExec("alter table tt change a a varbinary(5);")
	mvb := external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, 5, mvb.FieldType.GetFlen())
	tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))
	tk.MustGetErrMsg("alter table tt change a a varbinary(4);", "[types:1265]Data truncated for column 'a', value is '10000'")
	tk.MustExec("alter table tt change a a varbinary(12);")
	tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))
	tk.MustQuery("select length(a) from tt").Check(testkit.Rows("3", "5"))

	// varchar to char
	tk.MustExec("drop table if exists tt;")
	tk.MustExec("create table tt (a varchar(10));")
	tk.MustExec("insert into tt values ('111'),('10000');")

	tk.MustExec("alter table tt change a a char(10);")
	c2 := external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, mysql.TypeString, c2.FieldType.GetType())
	require.Equal(t, 10, c2.FieldType.GetFlen())
	tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))
	tk.MustGetErrMsg("alter table tt change a a char(4);", "[types:1265]Data truncated for column 'a', value is '10000'")

	// char to text
	tk.MustExec("alter table tt change a a text;")
	c2 = external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, mysql.TypeBlob, c2.FieldType.GetType())

	// text to set
	tk.MustGetErrMsg("alter table tt change a a set('111', '2222');", "[types:1265]Data truncated for column 'a', value is '10000'")
	tk.MustExec("alter table tt change a a set('111', '10000');")
	c2 = external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, mysql.TypeSet, c2.FieldType.GetType())
	tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))

	// set to set
	tk.MustExec("alter table tt change a a set('10000', '111');")
	c2 = external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, mysql.TypeSet, c2.FieldType.GetType())
	tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))

	// set to enum
	tk.MustGetErrMsg("alter table tt change a a enum('111', '2222');", "[types:1265]Data truncated for column 'a', value is '10000'")
	tk.MustExec("alter table tt change a a enum('111', '10000');")
	c2 = external.GetModifyColumn(t, tk, "test", "tt", "a", false)
	require.Equal(t, mysql.TypeEnum, c2.FieldType.GetType())
	tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))
	tk.MustExec("alter table tt change a a enum('10000', '111');")
	tk.MustQuery("select * from tt where a = 1").Check(testkit.Rows("10000"))
	tk.MustQuery("select * from tt where a = 2").Check(testkit.Rows("111"))

	// no-strict mode
	tk.MustExec(`set @@sql_mode="";`)
	tk.MustExec("alter table tt change a a enum('111', '2222');")
	tk.MustQuery("show warnings").Check(testkit.RowsWithSep("|", "Warning|1265|Data truncated for column 'a', value is '10000'"))

	tk.MustExec("drop table tt;")
}

func TestModifyColumnCharset(t *testing.T) {
	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("create table t_mcc(a varchar(8) charset utf8, b varchar(8) charset utf8)")

	result := tk.MustQuery(`show create table t_mcc`)
	result.Check(testkit.Rows(
		"t_mcc CREATE TABLE `t_mcc` (\n" +
			"  `a` varchar(8) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,\n" +
			"  `b` varchar(8) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL\n" +
			") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))

	tk.MustExec("alter table t_mcc modify column a varchar(8);")
	tbl := external.GetTableByName(t, tk, "test", "t_mcc")
	tbl.Meta().Version = model.TableInfoVersion0
	// When the table version is TableInfoVersion0, the following statement don't change "b" charset.
	// So the behavior is not compatible with MySQL.
	tk.MustExec("alter table t_mcc modify column b varchar(8);")
	result = tk.MustQuery(`show create table t_mcc`)
	result.Check(testkit.Rows(
		"t_mcc CREATE TABLE `t_mcc` (\n" +
			"  `a` varchar(8) DEFAULT NULL,\n" +
			"  `b` varchar(8) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL\n" +
			") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}

func TestModifyColumnTime(t *testing.T) {
	now := time.Now().UTC()
	now = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
	timeToDate1 := now.Format("2006-01-02")
	timeToDate2 := now.AddDate(0, 0, 30).Format("2006-01-02")
	timeToDatetime1 := now.Add(20 * time.Hour).Add(12 * time.Second).Format("2006-01-02 15:04:05")
	timeToDatetime2 := now.Add(20 * time.Hour).Format("2006-01-02 15:04:05")
	timeToDatetime3 := now.Add(12 * time.Second).Format("2006-01-02 15:04:05")
	timeToDatetime4 := now.AddDate(0, 0, 30).Add(20 * time.Hour).Add(12 * time.Second).Format("2006-01-02 15:04:05")
	timeToDatetime5 := now.AddDate(0, 0, 30).Add(20 * time.Hour).Format("2006-01-02 15:04:05")
	timeToTimestamp1 := now.Add(20 * time.Hour).Add(12 * time.Second).Format("2006-01-02 15:04:05")
	timeToTimestamp2 := now.Add(20 * time.Hour).Format("2006-01-02 15:04:05")
	timeToTimestamp3 := now.Add(12 * time.Second).Format("2006-01-02 15:04:05")
	timeToTimestamp4 := now.AddDate(0, 0, 30).Add(20 * time.Hour).Add(12 * time.Second).Format("2006-01-02 15:04:05")
	timeToTimestamp5 := now.AddDate(0, 0, 30).Add(20 * time.Hour).Format("2006-01-02 15:04:05")

	tests := []testModifyColumnTimeCase{
		// time to date
		{"time", `"30 20:00:12"`, "date", timeToDate2, 0},
		{"time", `"30 20:00"`, "date", timeToDate2, 0},
		{"time", `"30 20"`, "date", timeToDate2, 0},
		{"time", `"20:00:12"`, "date", timeToDate1, 0},
		{"time", `"20:00"`, "date", timeToDate1, 0},
		{"time", `"12"`, "date", timeToDate1, 0},
		{"time", `"200012"`, "date", timeToDate1, 0},
		{"time", `200012`, "date", timeToDate1, 0},
		{"time", `0012`, "date", timeToDate1, 0},
		{"time", `12`, "date", timeToDate1, 0},
		{"time", `"30 20:00:12.498"`, "date", timeToDate2, 0},
		{"time", `"20:00:12.498"`, "date", timeToDate1, 0},
		{"time", `"200012.498"`, "date", timeToDate1, 0},
		{"time", `200012.498`, "date", timeToDate1, 0},
		// time to datetime
		{"time", `"30 20:00:12"`, "datetime", timeToDatetime4, 0},
		{"time", `"30 20:00"`, "datetime", timeToDatetime5, 0},
		{"time", `"30 20"`, "datetime", timeToDatetime5, 0},
		{"time", `"20:00:12"`, "datetime", timeToDatetime1, 0},
		{"time", `"20:00"`, "datetime", timeToDatetime2, 0},
		{"time", `"12"`, "datetime", timeToDatetime3, 0},
		{"time", `"200012"`, "datetime", timeToDatetime1, 0},
		{"time", `200012`, "datetime", timeToDatetime1, 0},
		{"time", `0012`, "datetime", timeToDatetime3, 0},
		{"time", `12`, "datetime", timeToDatetime3, 0},
		{"time", `"30 20:00:12.498"`, "datetime", timeToDatetime4, 0},
		{"time", `"20:00:12.498"`, "datetime", timeToDatetime1, 0},
		{"time", `"200012.498"`, "datetime", timeToDatetime1, 0},
		{"time", `200012.498`, "datetime", timeToDatetime1, 0},
		// time to timestamp
		{"time", `"30 20:00:12"`, "timestamp", timeToTimestamp4, 0},
		{"time", `"30 20:00"`, "timestamp", timeToTimestamp5, 0},
		{"time", `"30 20"`, "timestamp", timeToTimestamp5, 0},
		{"time", `"20:00:12"`, "timestamp", timeToTimestamp1, 0},
		{"time", `"20:00"`, "timestamp", timeToTimestamp2, 0},
		{"time", `"12"`, "timestamp", timeToTimestamp3, 0},
		{"time", `"200012"`, "timestamp", timeToTimestamp1, 0},
		{"time", `200012`, "timestamp", timeToTimestamp1, 0},
		{"time", `0012`, "timestamp", timeToTimestamp3, 0},
		{"time", `12`, "timestamp", timeToTimestamp3, 0},
		{"time", `"30 20:00:12.498"`, "timestamp", timeToTimestamp4, 0},
		{"time", `"20:00:12.498"`, "timestamp", timeToTimestamp1, 0},
		{"time", `"200012.498"`, "timestamp", timeToTimestamp1, 0},
		{"time", `200012.498`, "timestamp", timeToTimestamp1, 0},
	}
	testModifyColumnTime(t, tests)
}

type testModifyColumnTimeCase struct {
	from   string
	value  string
	to     string
	expect string
	err    uint16
}

func testModifyColumnTime(t *testing.T, tests []testModifyColumnTimeCase) {
	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("set @@global.tidb_ddl_error_count_limit = 3")
	tk.MustExec("set @@time_zone=UTC")

	defer func() {
		tk.MustExec("set @@global.tidb_ddl_error_count_limit = default")
		tk.MustExec("set @@time_zone=default")
	}()

	for _, test := range tests {
		comment := fmt.Sprintf("%+v", test)
		tk.MustExec("drop table if exists t_mc")
		tk.MustExec(fmt.Sprintf("create table t_mc(a %s)", test.from))
		tk.MustExec(fmt.Sprintf(`insert into t_mc (a) values (%s)`, test.value))
		_, err := tk.Exec(fmt.Sprintf(`alter table t_mc modify a %s`, test.to))
		if test.err != 0 {
			require.Error(t, err, comment)
			require.Regexp(t, fmt.Sprintf(".*[ddl:%d].*", test.err), err.Error(), comment)
			continue
		}
		require.NoError(t, err, comment)
		tk.MustQuery("select a from t_mc").Check(testkit.Rows(test.expect))
	}
}

// TestModifyColumnTypeWhenInterception is to test modifying column type with warnings intercepted by
// reorg timeout, not owner error and so on.
func TestModifyColumnTypeWhenInterception(t *testing.T) {
	store := testkit.CreateMockStore(t)

	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")

	// Test normal warnings.
	tk.MustExec("create table t(a int primary key, b decimal(4,2))")

	count := defaultBatchSize * 4
	// Add some rows.
	dml := "insert into t values"
	for i := 1; i <= count; i++ {
		dml += fmt.Sprintf("(%d, %f)", i, 11.22)
		if i != count {
			dml += ","
		}
	}
	tk.MustExec(dml)
	// Make the regions scale like: [1, 1024), [1024, 2048), [2048, 3072), [3072, 4096]
	tk.MustQuery("split table t between(0) and (4096) regions 4")

	testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockReorgTimeoutInOneRegion", `return(true)`)
	tk.MustExec("alter table t modify column b decimal(3,1)")
	tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1292 4096 warnings with this error code, first warning: Truncated incorrect DECIMAL value: '11.22'"))
}

func TestModifyColumnWithIndexesWriteConflict(t *testing.T) {
	testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/disableLossyDDLOptimization", "return(true)")

	store := testkit.CreateMockStore(t)

	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("set @@global.tidb_general_log=1;")
	tk.MustExec(`
		CREATE TABLE t (
			id int NOT NULL AUTO_INCREMENT,
			val0 varchar(16) NOT NULL,
			val1 int NOT NULL,
			padding varchar(256) NOT NULL DEFAULT '',
			PRIMARY KEY (id)
		);
	`)
	tk.MustExec("CREATE INDEX val0_idx ON t (val0)")
	tk.MustExec("insert into t (val0, val1, padding) values ('1', 1, 'a'), ('2', 2, 'b'), ('3', 3, 'c')")

	conflictOnce := sync.Once{}
	conflictCh := make(chan struct{})
	tk1 := testkit.NewTestKit(t, store)
	failpoint.EnableCall("github.com/pingcap/tidb/pkg/table/tables/duringTableCommonRemoveRecord", func(tblInfo *model.TableInfo) {
		if tblInfo.Name.L == "t" {
			conflictOnce.Do(func() {
				tk1.MustExec("use test")
				// inject a write conflict for the delete DML.
				tk1.MustExec("update t set val0 = '100' where id = 1;")
				close(conflictCh)
			})
		}
	})
	deleteOnce := sync.Once{}
	insertOnce := sync.Once{}
	tk2 := testkit.NewTestKit(t, store)
	tk3 := testkit.NewTestKit(t, store)
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterReorgWorkForModifyColumn", func() {
		deleteOnce.Do(func() {
			go func() {
				tk2.MustExec("use test")
				tk2.MustExec("delete from t where id = 1;")
			}()
			testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/infoschema/issyncer/afterLoadSchemaDiffs", func(int64) {
				insertOnce.Do(func() {
					tk3.MustExec("use test")
					tk3.MustExec("insert into t (val0, val1, padding) values ('4', 4, 'd');")
				})
			})
			<-conflictCh
		})
	})
	tk.MustExec("alter table t modify column val0 varchar(8) not null;")
	tk.MustExec("admin check table t;")
	tk.MustQuery("select * from t order by id;").Check(testkit.Rows(
		"2 2 2 b",
		"3 3 3 c",
		"4 4 4 d"))
}

func TestMultiSchemaModifyColumnWithSkipReorg(t *testing.T) {
	store := testkit.CreateMockStore(t)

	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("create table t(a varchar(16), b bigint, c bigint, index i1(a), index i2(b), index i3(c), index i4(a, b))")
	tk.MustExec("insert into t values ('a  ', 1, 1), ('b  ', 2, 2), ('c ', 3, 3)")
	oldMeta := external.GetTableByName(t, tk, "test", "t").Meta()

	tk.MustExec("alter table t modify column a char(8) after b, modify column b int after a")
	tk.MustExec("admin check table t")
	newMeta := external.GetTableByName(t, tk, "test", "t").Meta()

	// the offset and ID of b should be unchanged
	require.Equal(t, oldMeta.Columns[1].ID, newMeta.Columns[1].ID)
}

func TestModifyColumnWithSkipReorg(t *testing.T) {
	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")

	// INT -> MEDIUMINT
	tk.MustExec("create table t(a int, b int, index i1(a), index i2(b), index i3(a, b))")
	tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)")
	oldMeta := external.GetTableByName(t, tk, "test", "t").Meta()

	// insert should fail by new column type check
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDoModifyColumnSkipReorgCheck", func() {
		tk2 := testkit.NewTestKit(t, store)
		tk2.MustExecToErr("insert into test.t values (2147483648, 2147483648)")
	})
	tk.MustExec("alter table t modify column b mediumint not null")
	testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/afterDoModifyColumnSkipReorgCheck")
	newMeta := external.GetTableByName(t, tk, "test", "t").Meta()

	// ID should be the same.
	require.Equal(t, oldMeta.Columns[1].ID, newMeta.Columns[1].ID)
	require.Nil(t, newMeta.Columns[1].ChangingFieldType)
	tk.MustExec("admin check table t")

	// insert should succeed before adding flag, and this will make modify column fail.
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDoModifyColumnSkipReorgCheck", func() {
		tk2 := testkit.NewTestKit(t, store)
		tk2.MustExec("insert into test.t values (512, 512)")
	})
	tk.MustExecToErr("alter table t modify column b tinyint not null")

	// VARCHAR -> CHAR
	var gotTp byte
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/getModifyColumnType", func(tp byte) {
		gotTp = tp
	})

	tk.MustExec("drop table if exists t")
	tk.MustExec("create table t (a varchar(10))")
	tk.MustExec("insert into t values ('a '), ('b ')")
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/modifyColumnTypeWithData", func(*model.Job, model.JobArgs) {
		tk2 := testkit.NewTestKit(t, store)
		tk2.MustExec("use test")
		tk2.MustExec("insert into t values ('a ')")
	})
	tk.MustExec("alter table t modify column a char(5)")
	tk.MustExec("admin check table t")
	require.Equal(t, ddl.ModifyTypeReorg, gotTp)

	tk.MustExec("drop table if exists t;")
	tk.MustExec("create table t (a varchar(10))")
	tk.MustExec("insert into t values ('a'), ('b')")
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDoModifyColumnSkipReorgCheck", func() {
		tk2 := testkit.NewTestKit(t, store)
		tk2.MustExec("use test")
		tk2.MustExecToErr("insert into t values ('a ')")
	})
	tk.MustExec("alter table t modify column a char(5)")
	tk.MustExec("admin check table t")
	require.Equal(t, ddl.ModifyTypeNoReorgWithCheck, gotTp)
}

func TestGetModifyColumnType(t *testing.T) {
	type testCase struct {
		beforeType string
		afterType  string
		index      bool
		tp         byte
	}

	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")

	tcs := []testCase{
		// integer
		{
			beforeType: "int",
			afterType:  "bigint",
			tp:         ddl.ModifyTypeNoReorg,
		},
		{
			beforeType: "bigint",
			afterType:  "int",
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "bigint",
			afterType:  "int",
			index:      true,
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "bigint",
			afterType:  "bigint unsigned",
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "bigint",
			afterType:  "bigint unsigned",
			index:      true,
			tp:         ddl.ModifyTypeIndexReorg,
		},
		{
			beforeType: "int unsigned",
			afterType:  "bigint",
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "int unsigned",
			afterType:  "bigint",
			index:      true,
			tp:         ddl.ModifyTypeIndexReorg,
		},
		// string
		{
			beforeType: "char(10)",
			afterType:  "char(20)",
			tp:         ddl.ModifyTypeNoReorg,
		},
		{
			beforeType: "char(20)",
			afterType:  "char(10)",
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "char(20) collate utf8mb4_bin",
			afterType:  "char(10) collate utf8mb4_bin",
			index:      true,
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "char(20) collate utf8mb4_general_ci",
			afterType:  "char(10) collate utf8mb4_general_ci",
			index:      true,
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "char(10)",
			afterType:  "varchar(20)",
			tp:         ddl.ModifyTypeNoReorg,
		},
		{
			beforeType: "char(20)",
			afterType:  "varchar(10)",
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "char(20) collate utf8mb4_bin",
			afterType:  "varchar(10) collate utf8mb4_bin",
			index:      true,
			tp:         ddl.ModifyTypeIndexReorg,
		},
		{
			beforeType: "char(20) collate utf8mb4_general_ci",
			afterType:  "varchar(10) collate utf8mb4_general_ci",
			index:      true,
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "varchar(10)",
			afterType:  "varchar(20)",
			tp:         ddl.ModifyTypeNoReorg,
		},
		{
			beforeType: "varchar(20)",
			afterType:  "varchar(10)",
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "varchar(20) collate utf8mb4_bin",
			afterType:  "varchar(10) collate utf8mb4_bin",
			index:      true,
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "varchar(20) collate utf8mb4_general_ci",
			afterType:  "varchar(10) collate utf8mb4_general_ci",
			index:      true,
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "varchar(10)",
			afterType:  "char(20)",
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "varchar(20)",
			afterType:  "char(10)",
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		{
			beforeType: "varchar(20) collate utf8mb4_bin",
			afterType:  "char(10) collate utf8mb4_bin",
			index:      true,
			tp:         ddl.ModifyTypeIndexReorg,
		},
		{
			beforeType: "varchar(20) collate utf8mb4_general_ci",
			afterType:  "char(10) collate utf8mb4_general_ci",
			index:      true,
			tp:         ddl.ModifyTypeNoReorgWithCheck,
		},
		// different collation
		{
			beforeType: "char(20) collate utf8mb4_bin",
			afterType:  "varchar(10) collate utf8_unicode_ci",
			index:      true,
			tp:         ddl.ModifyTypeIndexReorg,
		},
		{
			beforeType: "char(20) collate utf8_unicode_ci",
			afterType:  "varchar(10) collate utf8mb4_bin",
			index:      true,
			tp:         ddl.ModifyTypeIndexReorg,
		},
		{
			beforeType: "varchar(20) collate utf8mb4_bin",
			afterType:  "char(10) collate utf8_unicode_ci",
			index:      true,
			tp:         ddl.ModifyTypeIndexReorg,
		},
		{
			beforeType: "varchar(20) collate utf8_unicode_ci",
			afterType:  "char(10) collate utf8mb4_bin",
			index:      true,
			tp:         ddl.ModifyTypeIndexReorg,
		},
	}

	var gotTp byte
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/getModifyColumnType", func(tp byte) {
		gotTp = tp
	})

	runSingle := func(t *testing.T, tc testCase) {
		tk.MustExec("drop table if exists t")
		indexPart := ""
		if tc.index {
			indexPart = ", index idx_a(a), primary key(p1, p2)"
		}
		tk.MustExec(fmt.Sprintf("create table t (p1 int, p2 int, a %s%s)", tc.beforeType, indexPart))
		tk.MustExec("insert into t values (1, 1, '1'), (2, 2, '2'), (3, 3, '3')")
		tk.MustExec(fmt.Sprintf("alter table t modify column a %s", tc.afterType))
		tk.MustExec("insert into t values (4, 4, '4'), (5, 5, '5'), (6, 6, '6 ')")
		tk.MustExec("admin check table t")
		require.Equal(t, tc.tp, gotTp, "before type: %s, after type: %s", tc.beforeType, tc.afterType)
	}

	tk.MustExec("set sql_mode='STRICT_ALL_TABLES'")
	for _, tc := range tcs {
		runSingle(t, tc)
	}

	tcsNonStrict := []testCase{
		{
			beforeType: "bigint",
			afterType:  "int",
			tp:         ddl.ModifyTypeReorg,
		},
		{
			beforeType: "char(20)",
			afterType:  "char(10)",
			tp:         ddl.ModifyTypeReorg,
		},
		{
			beforeType: "varchar(20)",
			afterType:  "varchar(10)",
			tp:         ddl.ModifyTypeReorg,
		},
		{
			beforeType: "char(20)",
			afterType:  "varchar(10)",
			tp:         ddl.ModifyTypeReorg,
		},
		{
			beforeType: "varchar(20)",
			afterType:  "char(10)",
			tp:         ddl.ModifyTypeReorg,
		},
	}

	tk.MustExec("set sql_mode=''")
	for _, tc := range tcsNonStrict {
		runSingle(t, tc)
	}
}

func TestMultiSchemaModifyColumnWithIndex(t *testing.T) {
	store := testkit.CreateMockStore(t)

	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("create table t(c1 bigint, c2 bigint, index i1(c1, c2), index i2(c1))")

	oldTblInfo := external.GetTableByName(t, tk, "test", "t").Meta()
	tk.MustExec("alter table t modify column c1 int, modify column c2 int")
	newTblInfo := external.GetTableByName(t, tk, "test", "t").Meta()

	require.Equal(t, len(oldTblInfo.Indices), len(newTblInfo.Indices))
	for i, oldIdx := range oldTblInfo.Indices {
		newIdx := newTblInfo.Indices[i]
		require.Equal(t, oldIdx.Name, newIdx.Name)
		require.Equal(t, len(oldIdx.Columns), len(newIdx.Columns))
		for j := range oldIdx.Columns {
			require.Equal(t, oldIdx.Columns[j].Name, newIdx.Columns[j].Name)
			require.Equal(t, oldIdx.Columns[j].Offset, newIdx.Columns[j].Offset)
		}
	}

	// multi schema change with rename index
	tk.MustExec("drop table t")
	tk.MustExec("create table t(c1 bigint, c2 bigint, index i1(c1, c2), index i2(c1))")
	tk.MustExec("alter table t modify column c1 int, rename index i1 to new1, rename index i2 to new2, modify column c2 int")
	newTblInfo = external.GetTableByName(t, tk, "test", "t").Meta()
	require.Equal(t, 2, len(newTblInfo.Indices))
	require.Equal(t, "new1", newTblInfo.Indices[0].Name.L)
	require.Equal(t, "new2", newTblInfo.Indices[1].Name.L)
}

func TestParallelAlterTable(t *testing.T) {
	store := testkit.CreateMockStore(t)
	ctx := context.Background()
	var wg util.WaitGroupWrapper

	checkParallelDDL := func(t *testing.T, createSQL, firstSQL, secondSQL string) (err1, err2 error) {
		var (
			submitted     = make(chan struct{}, 16)
			startSchedule = make(chan struct{})
		)

		tk := testkit.NewTestKit(t, store)
		tk.MustExec("use test")
		tk.MustExec("drop table if exists t")
		tk.MustExec(createSQL)

		testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeLoadAndDeliverJobs", func() {
			<-startSchedule
		})

		testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterGetJobFromLimitCh", func(ch chan *ddl.JobWrapper) {
			submitted <- struct{}{}
		})
		wg.Run(func() {
			tk1 := testkit.NewTestKit(t, store)
			tk1.MustExec("use test")
			_, err1 = tk1.Exec(firstSQL)
		})
		wg.Run(func() {
			// wait until first ddl is submitted
			<-submitted
			tk1 := testkit.NewTestKit(t, store)
			tk1.MustExec("use test")
			_, err2 = tk1.Exec(secondSQL)
		})
		require.Eventually(t, func() bool {
			gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session())
			require.NoError(t, err)
			return len(gotJobs) == 2
		}, 10*time.Second, 100*time.Millisecond)

		close(startSchedule)
		wg.Wait()
		return
	}

	t.Run("modify column then add index", func(t *testing.T) {
		err1, err2 := checkParallelDDL(t,
			"create table t(id int, c1 char(16))",
			"alter table t modify column c1 text(255)",
			"alter table t add index idx_c1(c1)",
		)
		require.NoError(t, err1)
		require.Error(t, err2)
	})

	t.Run("add index then modify column", func(t *testing.T) {
		err1, err2 := checkParallelDDL(t,
			"create table t(id int, c1 char(16))",
			"alter table t add index idx_c1(c1)",
			"alter table t modify column c1 text(255)",
		)
		require.NoError(t, err1)
		require.Error(t, err2)
	})

	t.Run("add index with prefix length then modify column", func(t *testing.T) {
		err1, err2 := checkParallelDDL(t,
			"create table t(id int, c1 char(16))",
			"alter table t add index idx_c1(c1(10))",
			"alter table t modify column c1 text(255)",
		)
		require.NoError(t, err1)
		require.NoError(t, err2)
	})

	t.Run("modify column then add index with prefix length", func(t *testing.T) {
		err1, err2 := checkParallelDDL(t,
			"create table t(id int, c1 char(16))",
			"alter table t modify column c1 text(255)",
			"alter table t add index idx_c1(c1(10))",
		)
		require.NoError(t, err1)
		require.NoError(t, err2)
	})
}

// > This test cover the scenarios of modifying integer column types. From signed/unsigned aspect here are 4 kinds
//
//	of changes: 1. signed to signed  2. signed to unsigned 3. unsigned to unsigned  4. unsigned to signed
//
// > For each kind of change, we test the combinations of old and new integer types by different byte size,
//
//			e.g. For 1. signed to signed, we test
//				bigint -> int, mediumint, smallint, tinyint,
//				int -> mediumint, smallint, tinyint,
//	 			mediumint -> smallint, tinyint,
//				smallint -> tinyint
//
// > And for each combination, we test the values that are expected to fail and expected to succeed.
func TestModifyIntegerColumn(t *testing.T) {
	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	var reorgType byte
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/getModifyColumnType", func(tp byte) {
		reorgType = tp
	})

	maxMinSignedVal := map[string][]int{
		"bigint":    {math.MaxInt64, math.MinInt64},
		"int":       {math.MaxInt32, math.MinInt32},
		"mediumint": {1<<23 - 1, -1 << 23},
		"smallint":  {math.MaxInt16, math.MinInt16},
		"tinyint":   {math.MaxInt8, math.MinInt8},
	}

	maxMinUnsignedVal := map[string][]uint{
		"bigint unsigned":    {math.MaxUint64, 0},
		"int unsigned":       {math.MaxUint32, 0},
		"mediumint unsigned": {1<<24 - 1, 0},
		"smallint unsigned":  {math.MaxUint16, 0},
		"tinyint unsigned":   {math.MaxUint8, 0},
	}

	failedValue := func(insertVal []string, newColTp string) {
		for _, val := range insertVal {
			tk.MustExec(fmt.Sprintf("insert into t values(%s)", val))
			err := tk.ExecToErr(fmt.Sprintf("alter table t modify column a %s", newColTp))
			require.Contains(t, err.Error(), "Data truncated for column 'a'")
			tk.MustExec("delete from t")
		}
	}

	successValue := func(insertVal string, newColTp string) {
		tk.MustExec(fmt.Sprintf("insert into t values %s", insertVal))
		tk.MustExec(fmt.Sprintf("alter table t modify column a %s", newColTp))
		require.Equal(t, ddl.ModifyTypeNoReorgWithCheck, reorgType)
	}

	signed2Signed := func(oldColTp, newColTp string, t *testing.T, expectReorgTp byte) {
		maxValOfNewCol, minValOfNewCol := maxMinSignedVal[newColTp][0], maxMinSignedVal[newColTp][1]
		maxValOfOldCol, minValOfOldCol := maxMinSignedVal[oldColTp][0], maxMinSignedVal[oldColTp][1]
		tk.MustExec("drop table if exists t")
		tk.MustExec(fmt.Sprintf("create table t(a %s)", oldColTp))

		// [maxValOfNewCol+1, maxValOfOldCol] fail
		failedValue([]string{
			fmt.Sprintf("%d", maxValOfNewCol+1),
			fmt.Sprintf("%d", maxValOfOldCol),
		}, newColTp)

		// [minValOfOldCol, minValOfNewCol-1] fail
		failedValue([]string{
			fmt.Sprintf("%d", minValOfNewCol-1),
			fmt.Sprintf("%d", minValOfOldCol),
		}, newColTp)

		// [maxValOfNewCol, minValOfNewCol] pass
		successValue(fmt.Sprintf("(%d), (%d), (0)", maxValOfNewCol, minValOfNewCol), newColTp)
	}

	unsigned2Unsigned := func(oldColTp, newColTp string, t *testing.T, expectReorgTp byte) {
		maxValOfNewCol, minValOfNewCol := maxMinUnsignedVal[newColTp][0], maxMinUnsignedVal[newColTp][1]
		maxValOfOldCol := maxMinUnsignedVal[oldColTp][0]
		tk.MustExec("drop table if exists t")
		tk.MustExec(fmt.Sprintf("create table t(a %s)", oldColTp))

		// [maxValOfNewCol+1, maxValOfOldCol] fail
		failedValue([]string{
			fmt.Sprintf("%d", maxValOfNewCol+1),
			fmt.Sprintf("%d", maxValOfOldCol),
		}, newColTp)

		// [0, maxValOfNewCol] pass
		successValue(fmt.Sprintf("(%d), (%d), (1)", maxValOfNewCol, minValOfNewCol), newColTp)
	}

	signed2Unsigned := func(oldColTp, newColTp string, t *testing.T, expectReorgTp byte, oldColIdx, newColIdx int) {
		maxValOfOldCol, minValOfOldCol := maxMinSignedVal[oldColTp][0], maxMinSignedVal[oldColTp][1]
		maxValOfNewCol := maxMinUnsignedVal[newColTp][0]
		tk.MustExec("drop table if exists t")
		tk.MustExec(fmt.Sprintf("create table t(a %s)", oldColTp))

		// [minValOfOldCol, -1] fail
		failedValue([]string{
			"-1",
			fmt.Sprintf("%d", minValOfOldCol),
		}, newColTp)

		if oldColIdx < newColIdx {
			// [maxValOfNewCol+1, maxValOfOldCol] fail
			failedValue([]string{
				fmt.Sprintf("%d", maxValOfNewCol+1),
				fmt.Sprintf("%d", maxValOfOldCol),
			}, newColTp)
		}

		// [0, min(maxValOfOldCol, maxValOfNewCol)] pass
		successValue(fmt.Sprintf("(%d), (1), (0)", min(uint(maxValOfOldCol), maxValOfNewCol)), newColTp)
	}

	unsigned2Signed := func(oldColTp, newColTp string, t *testing.T, expectReorgTp byte) {
		maxValOfNewCol := maxMinSignedVal[newColTp][0]
		maxValOfOldCol := maxMinUnsignedVal[oldColTp][0]
		tk.MustExec("drop table if exists t")
		tk.MustExec(fmt.Sprintf("create table t(a %s)", oldColTp))

		// [maxValOfNewCol+1, maxValOfOldCol] fail
		failedValue([]string{
			fmt.Sprintf("%d", uint64(maxValOfNewCol)+1),
			fmt.Sprintf("%d", maxValOfOldCol),
		}, newColTp)

		// [0, maxValOfNewCol] pass
		successValue(fmt.Sprintf("(%d), (1), (0)", maxValOfNewCol), newColTp)
	}

	signedTp := []string{"bigint", "int", "mediumint", "smallint", "tinyint"}
	unsignedTp := []string{"bigint unsigned", "int unsigned", "mediumint unsigned", "smallint unsigned", "tinyint unsigned"}
	for oldColIdx := range signedTp {
		// 1. signed -> signed
		// bigint -> int, mediumint, smallint, tinyint; int -> mediumint, smallint, tinyint; ...
		for newColIdx := oldColIdx + 1; newColIdx < len(signedTp); newColIdx++ {
			signed2Signed(signedTp[oldColIdx], signedTp[newColIdx], t, ddl.ModifyTypeNoReorgWithCheck)
		}
		// 2. signed -> unsigned
		// bigint -> bigint unsigned, int unsigned, mediumint unsigned, smallint unsigned, tinyint unsigned; int -> int unsigned, mediumint unsigned, smallint unsigned, tinyint unsigned; ...
		for newColIdx := range unsignedTp {
			signed2Unsigned(signedTp[oldColIdx], unsignedTp[newColIdx], t, ddl.ModifyTypeNoReorgWithCheck, oldColIdx, newColIdx)
		}
	}
	for oldColIdx := range unsignedTp {
		// 3. unsigned -> unsigned
		// bigint unsigned -> int unsigned, mediumint unsigned, smallint unsigned, tinyint unsigned; int unsigned -> mediumint unsigned, smallint unsigned, tinyint unsigned; ...
		for newColIdx := oldColIdx + 1; newColIdx < len(unsignedTp); newColIdx++ {
			unsigned2Unsigned(unsignedTp[oldColIdx], unsignedTp[newColIdx], t, ddl.ModifyTypeNoReorgWithCheck)
		}
		// 4. unsigned -> signed
		// bigint unsigned -> bigint, int, mediumint, smallint, tinyint; int unsigned -> int, mediumint, smallint, tinyint; ...
		for newColIdx := oldColIdx; newColIdx < len(signedTp); newColIdx++ {
			unsigned2Signed(unsignedTp[oldColIdx], signedTp[newColIdx], t, ddl.ModifyTypeNoReorgWithCheck)
		}
	}
}

func TestModifyStringColumn(t *testing.T) {
	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	var reorgType byte
	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/getModifyColumnType", func(tp byte) {
		reorgType = tp
	})
	type testCase struct {
		oldColTp        string
		newColTp        string
		insertVal       string
		pass            bool
		expectedReorgTp byte
	}
	noPaddingStrLen5 := strings.Repeat("a", 5)
	noPaddingStrLen15 := strings.Repeat("a", 15)
	paddingStrLen5 := strings.Repeat("a", 1) + strings.Repeat(" ", 4)
	paddingStrLen15 := strings.Repeat("a", 1) + strings.Repeat(" ", 14)

	cases := []testCase{
		{
			oldColTp:  "char(20)",
			newColTp:  "char(10)",
			insertVal: noPaddingStrLen15,
		},
		{
			oldColTp:  "char(20)",
			newColTp:  "char(10)",
			insertVal: noPaddingStrLen5,
			pass:      true,
		},
		{
			oldColTp:  "varchar(20)",
			newColTp:  "varchar(10)",
			insertVal: noPaddingStrLen15,
		},
		{
			oldColTp:  "varchar(20)",
			newColTp:  "varchar(10)",
			insertVal: noPaddingStrLen5,
			pass:      true,
		},
		{
			oldColTp:  "char(20)",
			newColTp:  "varchar(10)",
			insertVal: noPaddingStrLen15,
		},
		{
			oldColTp:  "char(20)",
			newColTp:  "varchar(10)",
			insertVal: noPaddingStrLen5,
			pass:      true,
		},
		{
			oldColTp:        "varchar(10)",
			newColTp:        "char(20)",
			insertVal:       paddingStrLen5,
			pass:            true,
			expectedReorgTp: ddl.ModifyTypeReorg,
		},
		{
			oldColTp:  "varchar(10)",
			newColTp:  "char(20)",
			insertVal: noPaddingStrLen5,
			pass:      true,
		},
		{
			oldColTp:        "varchar(20)",
			newColTp:        "char(10)",
			insertVal:       paddingStrLen5,
			pass:            true,
			expectedReorgTp: ddl.ModifyTypeReorg,
		},
		{
			oldColTp:        "varchar(20)",
			newColTp:        "char(10)",
			insertVal:       paddingStrLen15,
			pass:            true,
			expectedReorgTp: ddl.ModifyTypeReorg,
		},
		{
			oldColTp:  "varchar(20)",
			newColTp:  "char(10)",
			insertVal: noPaddingStrLen15,
		},
		{
			oldColTp:  "varchar(20)",
			newColTp:  "char(10)",
			insertVal: noPaddingStrLen5,
			pass:      true,
		},
	}

	for _, tc := range cases {
		tk.MustExec("drop table if exists t")
		tk.MustExec(fmt.Sprintf("create table t(a %s)", tc.oldColTp))
		tk.MustExec(fmt.Sprintf("insert into t values('%s')", tc.insertVal))
		err := tk.ExecToErr(fmt.Sprintf("alter table t modify column a %s", tc.newColTp))
		if tc.pass {
			require.Nil(t, err)
			expectedReorgTp := tc.expectedReorgTp
			if tc.expectedReorgTp == ddl.ModifyTypeNone {
				expectedReorgTp = ddl.ModifyTypeNoReorgWithCheck
			}
			require.Equal(t, expectedReorgTp, reorgType)
		} else {
			require.Contains(t, err.Error(), "Data truncated for column 'a'")
		}
	}
}

func TestModifyColumnWithDifferentCollation(t *testing.T) {
	store := testkit.CreateMockStore(t)
	tk := testkit.NewTestKit(t, store)

	runSingleTest := func(t *testing.T, oldColTp, newColTp string) {
		tk.MustExec("use test")
		tk.MustExec("drop table if exists t1")
		tk.MustExec(fmt.Sprintf(`
			CREATE TABLE t1 (
			c1 int NOT NULL DEFAULT '1',
			c2 int NOT NULL DEFAULT '1',
			c3 %s,
			PRIMARY KEY (c1, c2),
			KEY i1 (c3)
			) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
		`, oldColTp))

		for i := range 10 {
			tk.MustExec(fmt.Sprintf("insert into t1 (c1, c3) values (%d, 'space%d ')", i, i))
		}

		insertIdx := 32
		deleteIdx := 0
		testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(_ *model.Job) {
			tk2 := testkit.NewTestKit(t, store)
			tk2.MustExec("use test")
			// Test data consistency for insert/delete check during reorg.
			err := tk2.ExecToErr(fmt.Sprintf("insert into t1 (c1, c3) values ('%d', 'space%d   ')", insertIdx, insertIdx))
			if err != nil {
				// The only possible error is data truncation error during modify column.
				require.Contains(t, err.Error(), "data truncation error during modify column")
			}
			tk2.MustExec(fmt.Sprintf("delete from t1 where c1 = %d", deleteIdx))
			deleteIdx++
			insertIdx++
		})

		tk.MustExec(fmt.Sprintf("alter table t1 modify column c3 %s", newColTp))
		require.True(t, deleteIdx > 0, "failpoint should be triggered")
		tk.MustExec("admin check table t1;")
	}

	var (
		oldTps []string
		newTps []string
	)
	for _, tp := range []string{"char", "varchar"} {
		for _, collation := range []string{"utf8mb4_bin", "utf8_unicode_ci", "utf8mb4_general_ci"} {
			oldTps = append(oldTps, fmt.Sprintf("%s(32) collate %s", tp, collation))
			newTps = append(newTps, fmt.Sprintf("%s(23) collate %s", tp, collation))
		}
	}

	for i, oldColTp := range oldTps {
		for j, newColTp := range newTps {
			if i == j {
				continue
			}
			t.Run(fmt.Sprintf("%s -> %s", oldColTp, newColTp), func(t *testing.T) {
				runSingleTest(t, oldColTp, newColTp)
			})
		}
	}
}
